Introduction to DataFrame API

Acknowledgments

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.

The trend is to use DataFrames instead of RDDs.

One of the main benefits of the DataFrame approach is that it’s easier to use and more user friendly than the RDD one. Still, the RDD API is still present but put into maintenance mode (it will no longer be extended and will be deprecated when the DataFrame API will reach feature parity with it).

A DataFrame is a Spark Dataset (in short - a distributed, strongly-typed collection of data, the interface was introduced in Spark 1.6) organized into named columns (which represent the variables).

Characteristics

What are the main selling points and benefits of using the DataFrame API over the older RDD one? Here’s a few:

  • Familiarity - as mentioned beforehand, the concept is analogous to wider known and used approaches of manipulating data as tables in relational databases or the data frame construct in e.g. R.
  • Uniform API - the API is consistent among the languages thus we don’t waste time on accommodating the differences and can focus on what’s important.
  • Spark SQL - it enables us accessing and manipulating the data via SQL queries and a SQL-like domain-specific language.
  • Optimizations - there is a set of optimizations implemented under the hood of Dataset that give us more performance when handling the data.
  • Multitude of possible sources - we can construct a DataSet from external databases, existing RDDs, CSV files, JSON and a multitude of other structured data sources.

Starting Point: SparkSession

The entry point into all functionality in Spark is the SparkSession class.

To create a basic SparkSession, just use SparkSession.builder:


In [ ]:
from pyspark.sql import SparkSession
import pyspark 

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Creating a DataFrame

We have multiple possible sources from which we can create a DataFrame. To load a streaming Dataset from an external source we will use the DataStreamReader interface.

The DataStreamReader for the session can be obtained by calling the read method upon the instance.

We can add input options for the underlying data source by calling the option method upon the reader instance. It takes a key and a value as the argument (or a whole Map).

There are two approaches to loading the data:

  • Format-specific methods like csv, jdbc, etc.
  • Specifying the format explicitly with the format method and then calling the generic load method. If no format is specified Parquet is the default one.

Here are the most common use cases when it comes to creating a DataFrame and the method used:

From Parquet

Parquet is a columnar storage format developed by Apache for projects in the Hadoop/Spark ecosystems.

We load it by calling the load or parquet methods with the path to the Parquet file as the argument, e.g.:


In [ ]:
df1 = spark.read.load("../data/users.parquet")

In [ ]:
df1.show()

From CSV

The well know comma-separated values file. Spark can automatically infer the schema of a CSV file loaded.

We load it by calling the csv method with the path to the CSV file as the argument, e.g.:


In [ ]:
df2 = spark.read.csv("../data/people.csv")

In [ ]:
df2.show()

From JSON

The JavaScript Object Notation format most widely utilized by Web applications for asynchronous frontend/backend communication. Spark can automatically infer the schema of a JSON file loaded.

We load it by calling the json method with the path to the JSON file as the argument, e.g.:


In [ ]:
df3 = spark.read.json("../data/people.json")

In [ ]:
df3.show()

Other Formats

  • From Hive: Apache Hive is a data warehouse software package. For interfacing DataFrames with Hive we need a SparkSession with enabled Hive support and all the needed dependencies in the classpath for Spark to load them automatically.

    We will not cover interfacing with a Hive data storage as this would require an understanding of what Hive is and how it works in more depth. For more information about the topic please consult the official documentation on the subject.

  • From Database: We can easily interface with any kind of database using JDBC. For it to be possible You need to have the required JDBC driver for the database you want to interface with included in Your classpath. We will not cover this part as it would mean a specific databsae setup.

From RDD

We can automatically convert a RDD into a DataFrame. The names of the arguments of the case classes will become the column names. It supports nesting complex types like Seq or Array.

All we need to do is simply call the toDF method on the RDD, i.e.:


In [ ]:
#rdd = spark.sparkContext.parallelize([{"a":[1,2,3,4],"b":[1,2,3,4]}])
rdd = spark.sparkContext.parallelize([pyspark.sql.Row(a=1,b=1),
                                      pyspark.sql.Row(a=2,b=2),
                                      pyspark.sql.Row(a=3,b=3),
                                      pyspark.sql.Row(a=4,b=4)])
df4 = rdd.toDF()
df4.show()

Untyped Dataset Operations (aka DataFrame Operations)

DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, Python and R.

As mentioned above, in Spark 2.0, DataFrames are just Dataset of Rows in Scala and Java API. These operations are also referred as “untyped transformations” in contrast to “typed transformations” come with strongly typed Scala/Java Datasets.

Here we include some basic examples of structured data processing using Datasets:


In [ ]:
df = spark.read.json("../data/people.json")
# Print the schema in a tree format
df.printSchema()

In [ ]:
# Select only the "name" column
df.select("name").show()

In [ ]:
#Select only the "name" column
df.select("name").show()

In [ ]:
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

In [ ]:
# Select people older than 21
df.filter(df['age'] > 21).show()

In [ ]:
# Count people by age
df.groupBy("age").count().show()

Running SQL Queries Programmatically

The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.


In [ ]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

Interoperating with RDDs

Spark SQL supports two different methods for converting existing RDDs into Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

The second method for creating Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.

Inferring the Schema Using Reflection

Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes.

Rows are constructed by passing a list of key/value pairs as kwargs to the Row class.

The keys of this list define the column names of the table, and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.


In [ ]:
from pyspark.sql import Row

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("../data/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin

Programmatically Specifying the Schema

When a dictionary of kwargs cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

  1. Create an RDD of tuples or lists from the original RDD;
  2. Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
  3. Apply the schema to the RDD via createDataFrame method provided by SparkSession.

For example:


In [ ]:
# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("../data/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+